package com.jse.util.concurrent.douglea.design;

/**
 * jdk9新特性 TODO
 * @author Administrator
 *
 */
public class SubmissionPublisherTest extends JSR166TestCase {

//    public static void main(String[] args) {
//        main(suite(), args);
//    }
//    public static Test suite() {
//        return new TestSuite(SubmissionPublisherTest.class);
//    }
//
//    final Executor basicExecutor = basicPublisher().getExecutor();
//
//    static SubmissionPublisher<Integer> basicPublisher() {
//        return new SubmissionPublisher<Integer>();
//    }
//
//    static class SPException extends RuntimeException {}
//
//    class TestSubscriber implements Subscriber<Integer> {
//        volatile Subscription sn;
//        int last;  // Requires that onNexts are in numeric order
//        volatile int nexts;
//        volatile int errors;
//        volatile int completes;
//        volatile boolean throwOnCall = false;
//        volatile boolean request = true;
//        volatile Throwable lastError;
//
//        public synchronized void onSubscribe(Subscription s) {
//            threadAssertTrue(sn == null);
//            sn = s;
//            notifyAll();
//            if (throwOnCall)
//                throw new SPException();
//            if (request)
//                sn.request(1L);
//        }
//        public synchronized void onNext(Integer t) {
//            ++nexts;
//            notifyAll();
//            int current = t.intValue();
//            threadAssertTrue(current >= last);
//            last = current;
//            if (request)
//                sn.request(1L);
//            if (throwOnCall)
//                throw new SPException();
//        }
//        public synchronized void onError(Throwable t) {
//            threadAssertTrue(completes == 0);
//            threadAssertTrue(errors == 0);
//            lastError = t;
//            ++errors;
//            notifyAll();
//        }
//        public synchronized void onComplete() {
//            threadAssertTrue(completes == 0);
//            ++completes;
//            notifyAll();
//        }
//
//        synchronized void awaitSubscribe() {
//            while (sn == null) {
//                try {
//                    wait();
//                } catch (Exception ex) {
//                    threadUnexpectedException(ex);
//                    break;
//                }
//            }
//        }
//        synchronized void awaitNext(int n) {
//            while (nexts < n) {
//                try {
//                    wait();
//                } catch (Exception ex) {
//                    threadUnexpectedException(ex);
//                    break;
//                }
//            }
//        }
//        synchronized void awaitComplete() {
//            while (completes == 0 && errors == 0) {
//                try {
//                    wait();
//                } catch (Exception ex) {
//                    threadUnexpectedException(ex);
//                    break;
//                }
//            }
//        }
//        synchronized void awaitError() {
//            while (errors == 0) {
//                try {
//                    wait();
//                } catch (Exception ex) {
//                    threadUnexpectedException(ex);
//                    break;
//                }
//            }
//        }
//
//    }
//
//    /**
//     * A new SubmissionPublisher has no subscribers, a non-null
//     * executor, a power-of-two capacity, is not closed, and reports
//     * zero demand and lag
//     */
//    void checkInitialState(SubmissionPublisher<?> p) {
//        assertFalse(p.hasSubscribers());
//        assertEquals(0, p.getNumberOfSubscribers());
//        assertTrue(p.getSubscribers().isEmpty());
//        assertFalse(p.isClosed());
//        assertNull(p.getClosedException());
//        int n = p.getMaxBufferCapacity();
//        assertTrue((n & (n - 1)) == 0); // power of two
//        assertNotNull(p.getExecutor());
//        assertEquals(0, p.estimateMinimumDemand());
//        assertEquals(0, p.estimateMaximumLag());
//    }
//
//    /**
//     * A default-constructed SubmissionPublisher has no subscribers,
//     * is not closed, has default buffer size, and uses the
//     * defaultExecutor
//     */
//    public void testConstructor1() {
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>();
//        checkInitialState(p);
//        assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
//        Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
//        if (ForkJoinPool.getCommonPoolParallelism() > 1)
//            assertSame(e, c);
//        else
//            assertNotSame(e, c);
//    }
//
//    /**
//     * A new SubmissionPublisher has no subscribers, is not closed,
//     * has the given buffer size, and uses the given executor
//     */
//    public void testConstructor2() {
//        Executor e = Executors.newFixedThreadPool(1);
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(e, 8);
//        checkInitialState(p);
//        assertSame(p.getExecutor(), e);
//        assertEquals(8, p.getMaxBufferCapacity());
//    }
//
//    /**
//     * A null Executor argument to SubmissionPublisher constructor throws NPE
//     */
//    public void testConstructor3() {
//        try {
//            new SubmissionPublisher<Integer>(null, 8);
//            shouldThrow();
//        } catch (NullPointerException success) {}
//    }
//
//    /**
//     * A negative capacity argument to SubmissionPublisher constructor
//     * throws IAE
//     */
//    public void testConstructor4() {
//        Executor e = Executors.newFixedThreadPool(1);
//        try {
//            new SubmissionPublisher<Integer>(e, -1);
//            shouldThrow();
//        } catch (IllegalArgumentException success) {}
//    }
//
//    /**
//     * A closed publisher reports isClosed with no closedException and
//     * throws ISE upon attempted submission; a subsequent close or
//     * closeExceptionally has no additional effect.
//     */
//    public void testClose() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        checkInitialState(p);
//        p.close();
//        assertTrue(p.isClosed());
//        assertNull(p.getClosedException());
//        try {
//            p.submit(1);
//            shouldThrow();
//        } catch (IllegalStateException success) {}
//        Throwable ex = new SPException();
//        p.closeExceptionally(ex);
//        assertTrue(p.isClosed());
//        assertNull(p.getClosedException());
//    }
//
//    /**
//     * A publisher closedExceptionally reports isClosed with the
//     * closedException and throws ISE upon attempted submission; a
//     * subsequent close or closeExceptionally has no additional
//     * effect.
//     */
//    public void testCloseExceptionally() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        checkInitialState(p);
//        Throwable ex = new SPException();
//        p.closeExceptionally(ex);
//        assertTrue(p.isClosed());
//        assertSame(p.getClosedException(), ex);
//        try {
//            p.submit(1);
//            shouldThrow();
//        } catch (IllegalStateException success) {}
//        p.close();
//        assertTrue(p.isClosed());
//        assertSame(p.getClosedException(), ex);
//    }
//
//    /**
//     * Upon subscription, the subscriber's onSubscribe is called, no
//     * other Subscriber methods are invoked, the publisher
//     * hasSubscribers, isSubscribed is true, and existing
//     * subscriptions are unaffected.
//     */
//    public void testSubscribe1() {
//        TestSubscriber s = new TestSubscriber();
//        SubmissionPublisher<Integer> p = basicPublisher();
//        p.subscribe(s);
//        assertTrue(p.hasSubscribers());
//        assertEquals(1, p.getNumberOfSubscribers());
//        assertTrue(p.getSubscribers().contains(s));
//        assertTrue(p.isSubscribed(s));
//        s.awaitSubscribe();
//        assertNotNull(s.sn);
//        assertEquals(0, s.nexts);
//        assertEquals(0, s.errors);
//        assertEquals(0, s.completes);
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s2);
//        assertTrue(p.hasSubscribers());
//        assertEquals(2, p.getNumberOfSubscribers());
//        assertTrue(p.getSubscribers().contains(s));
//        assertTrue(p.getSubscribers().contains(s2));
//        assertTrue(p.isSubscribed(s));
//        assertTrue(p.isSubscribed(s2));
//        s2.awaitSubscribe();
//        assertNotNull(s2.sn);
//        assertEquals(0, s2.nexts);
//        assertEquals(0, s2.errors);
//        assertEquals(0, s2.completes);
//        p.close();
//    }
//
//    /**
//     * If closed, upon subscription, the subscriber's onComplete
//     * method is invoked
//     */
//    public void testSubscribe2() {
//        TestSubscriber s = new TestSubscriber();
//        SubmissionPublisher<Integer> p = basicPublisher();
//        p.close();
//        p.subscribe(s);
//        s.awaitComplete();
//        assertEquals(0, s.nexts);
//        assertEquals(0, s.errors);
//        assertEquals(1, s.completes, 1);
//    }
//
//    /**
//     * If closedExceptionally, upon subscription, the subscriber's
//     * onError method is invoked
//     */
//    public void testSubscribe3() {
//        TestSubscriber s = new TestSubscriber();
//        SubmissionPublisher<Integer> p = basicPublisher();
//        Throwable ex = new SPException();
//        p.closeExceptionally(ex);
//        assertTrue(p.isClosed());
//        assertSame(p.getClosedException(), ex);
//        p.subscribe(s);
//        s.awaitError();
//        assertEquals(0, s.nexts);
//        assertEquals(1, s.errors);
//    }
//
//    /**
//     * Upon attempted resubscription, the subscriber's onError is
//     * called and the subscription is cancelled.
//     */
//    public void testSubscribe4() {
//        TestSubscriber s = new TestSubscriber();
//        SubmissionPublisher<Integer> p = basicPublisher();
//        p.subscribe(s);
//        assertTrue(p.hasSubscribers());
//        assertEquals(1, p.getNumberOfSubscribers());
//        assertTrue(p.getSubscribers().contains(s));
//        assertTrue(p.isSubscribed(s));
//        s.awaitSubscribe();
//        assertNotNull(s.sn);
//        assertEquals(0, s.nexts);
//        assertEquals(0, s.errors);
//        assertEquals(0, s.completes);
//        p.subscribe(s);
//        s.awaitError();
//        assertEquals(0, s.nexts);
//        assertEquals(1, s.errors);
//        assertFalse(p.isSubscribed(s));
//    }
//
//    /**
//     * An exception thrown in onSubscribe causes onError
//     */
//    public void testSubscribe5() {
//        TestSubscriber s = new TestSubscriber();
//        SubmissionPublisher<Integer> p = basicPublisher();
//        s.throwOnCall = true;
//        try {
//            p.subscribe(s);
//        } catch (Exception ok) {}
//        s.awaitError();
//        assertEquals(0, s.nexts);
//        assertEquals(1, s.errors);
//        assertEquals(0, s.completes);
//    }
//
//    /**
//     * subscribe(null) throws NPE
//     */
//    public void testSubscribe6() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        try {
//            p.subscribe(null);
//            shouldThrow();
//        } catch (NullPointerException success) {}
//        checkInitialState(p);
//    }
//
//    /**
//     * Closing a publisher causes onComplete to subscribers
//     */
//    public void testCloseCompletes() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s1);
//        p.subscribe(s2);
//        p.submit(1);
//        p.close();
//        assertTrue(p.isClosed());
//        assertNull(p.getClosedException());
//        s1.awaitComplete();
//        assertEquals(1, s1.nexts);
//        assertEquals(1, s1.completes);
//        s2.awaitComplete();
//        assertEquals(1, s2.nexts);
//        assertEquals(1, s2.completes);
//    }
//
//    /**
//     * Closing a publisher exceptionally causes onError to subscribers
//     */
//    public void testCloseExceptionallyError() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s1);
//        p.subscribe(s2);
//        p.submit(1);
//        p.closeExceptionally(new SPException());
//        assertTrue(p.isClosed());
//        s1.awaitError();
//        assertTrue(s1.nexts <= 1);
//        assertEquals(1, s1.errors);
//        s2.awaitError();
//        assertTrue(s2.nexts <= 1);
//        assertEquals(1, s2.errors);
//    }
//
//    /**
//     * Cancelling a subscription eventually causes no more onNexts to be issued
//     */
//    public void testCancel() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s1.awaitSubscribe();
//        p.submit(1);
//        s1.sn.cancel();
//        for (int i = 2; i <= 20; ++i)
//            p.submit(i);
//        p.close();
//        s2.awaitComplete();
//        assertEquals(20, s2.nexts);
//        assertEquals(1, s2.completes);
//        assertTrue(s1.nexts < 20);
//        assertFalse(p.isSubscribed(s1));
//    }
//
//    /**
//     * Throwing an exception in onNext causes onError
//     */
//    public void testThrowOnNext() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s1.awaitSubscribe();
//        p.submit(1);
//        s1.throwOnCall = true;
//        p.submit(2);
//        p.close();
//        s2.awaitComplete();
//        assertEquals(2, s2.nexts);
//        s1.awaitComplete();
//        assertEquals(1, s1.errors);
//    }
//
//    /**
//     * If a handler is supplied in constructor, it is invoked when
//     * subscriber throws an exception in onNext
//     */
//    public void testThrowOnNextHandler() {
//        AtomicInteger calls = new AtomicInteger();
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>
//            (basicExecutor, 8,
//             (s, e) -> calls.getAndIncrement());
//        TestSubscriber s1 = new TestSubscriber();
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s1.awaitSubscribe();
//        p.submit(1);
//        s1.throwOnCall = true;
//        p.submit(2);
//        p.close();
//        s2.awaitComplete();
//        assertEquals(2, s2.nexts);
//        assertEquals(1, s2.completes);
//        s1.awaitError();
//        assertEquals(1, s1.errors);
//        assertEquals(1, calls.get());
//    }
//
//    /**
//     * onNext items are issued in the same order to each subscriber
//     */
//    public void testOrder() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s1);
//        p.subscribe(s2);
//        for (int i = 1; i <= 20; ++i)
//            p.submit(i);
//        p.close();
//        s2.awaitComplete();
//        s1.awaitComplete();
//        assertEquals(20, s2.nexts);
//        assertEquals(1, s2.completes);
//        assertEquals(20, s1.nexts);
//        assertEquals(1, s1.completes);
//    }
//
//    /**
//     * onNext is issued only if requested
//     */
//    public void testRequest1() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        p.subscribe(s1);
//        s1.awaitSubscribe();
//        assertTrue(p.estimateMinimumDemand() == 0);
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s2);
//        p.submit(1);
//        p.submit(2);
//        s2.awaitNext(1);
//        assertEquals(0, s1.nexts);
//        s1.sn.request(3);
//        p.submit(3);
//        p.close();
//        s2.awaitComplete();
//        assertEquals(3, s2.nexts);
//        assertEquals(1, s2.completes);
//        s1.awaitComplete();
//        assertTrue(s1.nexts > 0);
//        assertEquals(1, s1.completes);
//    }
//
//    /**
//     * onNext is not issued when requests become zero
//     */
//    public void testRequest2() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        s1.request = false;
//        p.submit(1);
//        p.submit(2);
//        p.close();
//        s2.awaitComplete();
//        assertEquals(2, s2.nexts);
//        assertEquals(1, s2.completes);
//        s1.awaitNext(1);
//        assertEquals(1, s1.nexts);
//    }
//
//    /**
//     * Negative request causes error
//     */
//    public void testRequest3() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        s1.sn.request(-1L);
//        p.submit(1);
//        p.submit(2);
//        p.close();
//        s2.awaitComplete();
//        assertEquals(2, s2.nexts);
//        assertEquals(1, s2.completes);
//        s1.awaitError();
//        assertEquals(1, s1.errors);
//        assertTrue(s1.lastError instanceof IllegalArgumentException);
//    }
//
//    /**
//     * estimateMinimumDemand reports 0 until request, nonzero after
//     * request, and zero again after delivery
//     */
//    public void testEstimateMinimumDemand() {
//        TestSubscriber s = new TestSubscriber();
//        SubmissionPublisher<Integer> p = basicPublisher();
//        s.request = false;
//        p.subscribe(s);
//        s.awaitSubscribe();
//        assertEquals(0, p.estimateMinimumDemand());
//        s.sn.request(1);
//        assertEquals(1, p.estimateMinimumDemand());
//        p.submit(1);
//        s.awaitNext(1);
//        assertEquals(0, p.estimateMinimumDemand());
//    }
//
//    /**
//     * submit to a publisher with no subscribers returns lag 0
//     */
//    public void testEmptySubmit() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        assertEquals(0, p.submit(1));
//    }
//
//    /**
//     * submit(null) throws NPE
//     */
//    public void testNullSubmit() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        try {
//            p.submit(null);
//            shouldThrow();
//        } catch (NullPointerException success) {}
//    }
//
//    /**
//     * submit returns number of lagged items, compatible with result
//     * of estimateMaximumLag.
//     */
//    public void testLaggedSubmit() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        TestSubscriber s2 = new TestSubscriber();
//        s2.request = false;
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        assertEquals(1, p.submit(1));
//        assertTrue(p.estimateMaximumLag() >= 1);
//        assertTrue(p.submit(2) >= 2);
//        assertTrue(p.estimateMaximumLag() >= 2);
//        s1.sn.request(4);
//        assertTrue(p.submit(3) >= 3);
//        assertTrue(p.estimateMaximumLag() >= 3);
//        s2.sn.request(4);
//        p.submit(4);
//        p.close();
//        s2.awaitComplete();
//        assertEquals(4, s2.nexts);
//        s1.awaitComplete();
//        assertEquals(4, s2.nexts);
//    }
//
//    /**
//     * submit eventually issues requested items when buffer capacity is 1
//     */
//    public void testCap1Submit() {
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
//            basicExecutor, 1);
//        TestSubscriber s1 = new TestSubscriber();
//        TestSubscriber s2 = new TestSubscriber();
//        p.subscribe(s1);
//        p.subscribe(s2);
//        for (int i = 1; i <= 20; ++i) {
//            assertTrue(p.estimateMinimumDemand() <= 1);
//            assertTrue(p.submit(i) >= 0);
//        }
//        p.close();
//        s2.awaitComplete();
//        s1.awaitComplete();
//        assertEquals(20, s2.nexts);
//        assertEquals(1, s2.completes);
//        assertEquals(20, s1.nexts);
//        assertEquals(1, s1.completes);
//    }
//
//    static boolean noopHandle(AtomicInteger count) {
//        count.getAndIncrement();
//        return false;
//    }
//
//    static boolean reqHandle(AtomicInteger count, Subscriber s) {
//        count.getAndIncrement();
//        ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
//        return true;
//    }
//
//    /**
//     * offer to a publisher with no subscribers returns lag 0
//     */
//    public void testEmptyOffer() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        assertEquals(0, p.offer(1, null));
//    }
//
//    /**
//     * offer(null) throws NPE
//     */
//    public void testNullOffer() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        try {
//            p.offer(null, null);
//            shouldThrow();
//        } catch (NullPointerException success) {}
//    }
//
//    /**
//     * offer returns number of lagged items if not saturated
//     */
//    public void testLaggedOffer() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        TestSubscriber s2 = new TestSubscriber();
//        s2.request = false;
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        assertTrue(p.offer(1, null) >= 1);
//        assertTrue(p.offer(2, null) >= 2);
//        s1.sn.request(4);
//        assertTrue(p.offer(3, null) >= 3);
//        s2.sn.request(4);
//        p.offer(4, null);
//        p.close();
//        s2.awaitComplete();
//        assertEquals(4, s2.nexts);
//        s1.awaitComplete();
//        assertEquals(4, s2.nexts);
//    }
//
//    /**
//     * offer reports drops if saturated
//     */
//    public void testDroppedOffer() {
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
//            basicExecutor, 4);
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        TestSubscriber s2 = new TestSubscriber();
//        s2.request = false;
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        for (int i = 1; i <= 4; ++i)
//            assertTrue(p.offer(i, null) >= 0);
//        p.offer(5, null);
//        assertTrue(p.offer(6, null) < 0);
//        s1.sn.request(64);
//        assertTrue(p.offer(7, null) < 0);
//        s2.sn.request(64);
//        p.close();
//        s2.awaitComplete();
//        assertTrue(s2.nexts >= 4);
//        s1.awaitComplete();
//        assertTrue(s1.nexts >= 4);
//    }
//
//    /**
//     * offer invokes drop handler if saturated
//     */
//    public void testHandledDroppedOffer() {
//        AtomicInteger calls = new AtomicInteger();
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
//            basicExecutor, 4);
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        TestSubscriber s2 = new TestSubscriber();
//        s2.request = false;
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        for (int i = 1; i <= 4; ++i)
//            assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
//        p.offer(4, (s, x) -> noopHandle(calls));
//        assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
//        s1.sn.request(64);
//        assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
//        s2.sn.request(64);
//        p.close();
//        s2.awaitComplete();
//        s1.awaitComplete();
//        assertTrue(calls.get() >= 4);
//    }
//
//    /**
//     * offer succeeds if drop handler forces request
//     */
//    public void testRecoveredHandledDroppedOffer() {
//        AtomicInteger calls = new AtomicInteger();
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
//            basicExecutor, 4);
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        TestSubscriber s2 = new TestSubscriber();
//        s2.request = false;
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        int n = 0;
//        for (int i = 1; i <= 8; ++i) {
//            int d = p.offer(i, (s, x) -> reqHandle(calls, s));
//            n = n + 2 + (d < 0 ? d : 0);
//        }
//        p.close();
//        s2.awaitComplete();
//        s1.awaitComplete();
//        assertEquals(n, s1.nexts + s2.nexts);
//        assertTrue(calls.get() >= 2);
//    }
//
//    /**
//     * Timed offer to a publisher with no subscribers returns lag 0
//     */
//    public void testEmptyTimedOffer() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        long startTime = System.nanoTime();
//        assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
//        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
//    }
//
//    /**
//     * Timed offer with null item or TimeUnit throws NPE
//     */
//    public void testNullTimedOffer() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        long startTime = System.nanoTime();
//        try {
//            p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
//            shouldThrow();
//        } catch (NullPointerException success) {}
//        try {
//            p.offer(1, LONG_DELAY_MS, null, null);
//            shouldThrow();
//        } catch (NullPointerException success) {}
//        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
//    }
//
//    /**
//     * Timed offer returns number of lagged items if not saturated
//     */
//    public void testLaggedTimedOffer() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        TestSubscriber s2 = new TestSubscriber();
//        s2.request = false;
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        long startTime = System.nanoTime();
//        assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
//        assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
//        s1.sn.request(4);
//        assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
//        s2.sn.request(4);
//        p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
//        p.close();
//        s2.awaitComplete();
//        assertEquals(4, s2.nexts);
//        s1.awaitComplete();
//        assertEquals(4, s2.nexts);
//        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
//    }
//
//    /**
//     * Timed offer reports drops if saturated
//     */
//    public void testDroppedTimedOffer() {
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
//            basicExecutor, 4);
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        TestSubscriber s2 = new TestSubscriber();
//        s2.request = false;
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        long delay = timeoutMillis();
//        for (int i = 1; i <= 4; ++i)
//            assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
//        long startTime = System.nanoTime();
//        assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
//        s1.sn.request(64);
//        assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
//        // 2 * delay should elapse but check only 1 * delay to allow timer slop
//        assertTrue(millisElapsedSince(startTime) >= delay);
//        s2.sn.request(64);
//        p.close();
//        s2.awaitComplete();
//        assertTrue(s2.nexts >= 2);
//        s1.awaitComplete();
//        assertTrue(s1.nexts >= 2);
//    }
//
//    /**
//     * Timed offer invokes drop handler if saturated
//     */
//    public void testHandledDroppedTimedOffer() {
//        AtomicInteger calls = new AtomicInteger();
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
//            basicExecutor, 4);
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        TestSubscriber s2 = new TestSubscriber();
//        s2.request = false;
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        long delay = timeoutMillis();
//        for (int i = 1; i <= 4; ++i)
//            assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
//        long startTime = System.nanoTime();
//        assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
//        s1.sn.request(64);
//        assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
//        assertTrue(millisElapsedSince(startTime) >= delay);
//        s2.sn.request(64);
//        p.close();
//        s2.awaitComplete();
//        s1.awaitComplete();
//        assertTrue(calls.get() >= 2);
//    }
//
//    /**
//     * Timed offer succeeds if drop handler forces request
//     */
//    public void testRecoveredHandledDroppedTimedOffer() {
//        AtomicInteger calls = new AtomicInteger();
//        SubmissionPublisher<Integer> p = new SubmissionPublisher<Integer>(
//            basicExecutor, 4);
//        TestSubscriber s1 = new TestSubscriber();
//        s1.request = false;
//        TestSubscriber s2 = new TestSubscriber();
//        s2.request = false;
//        p.subscribe(s1);
//        p.subscribe(s2);
//        s2.awaitSubscribe();
//        s1.awaitSubscribe();
//        int n = 0;
//        long delay = timeoutMillis();
//        long startTime = System.nanoTime();
//        for (int i = 1; i <= 6; ++i) {
//            int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
//            n = n + 2 + (d < 0 ? d : 0);
//        }
//        assertTrue(millisElapsedSince(startTime) >= delay);
//        p.close();
//        s2.awaitComplete();
//        s1.awaitComplete();
//        assertEquals(n, s1.nexts + s2.nexts);
//        assertTrue(calls.get() >= 2);
//    }
//
//    /**
//     * consume returns a CompletableFuture that is done when
//     * publisher completes
//     */
//    public void testConsume() {
//        AtomicInteger sum = new AtomicInteger();
//        SubmissionPublisher<Integer> p = basicPublisher();
//        CompletableFuture<Void> f =
//            p.consume((Integer x) -> { sum.getAndAdd(x.intValue()); });
//        int n = 20;
//        for (int i = 1; i <= n; ++i)
//            p.submit(i);
//        p.close();
//        f.join();
//        assertEquals((n * (n + 1)) / 2, sum.get());
//    }
//
//    /**
//     * consume(null) throws NPE
//     */
//    public void testConsumeNPE() {
//        SubmissionPublisher<Integer> p = basicPublisher();
//        try {
//            CompletableFuture<Void> f = p.consume(null);
//            shouldThrow();
//        } catch (NullPointerException success) {}
//    }
//
//    /**
//     * consume eventually stops processing published items if cancelled
//     */
//    public void testCancelledConsume() {
//        AtomicInteger count = new AtomicInteger();
//        SubmissionPublisher<Integer> p = basicPublisher();
//        CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
//        f.cancel(true);
//        int n = 1000000; // arbitrary limit
//        for (int i = 1; i <= n; ++i)
//            p.submit(i);
//        assertTrue(count.get() < n);
//    }

}
